import json
import os

from kafka import KafkaAdminClient
from kafka.admin import NewTopic


class KafkaAdminClientConnect:
    """kafka主用户连接"""

    def __init__(self):
        # 获取配置文件路径
        dir_path = os.path.split(os.path.split(__file__)[0])[0]
        config_path = os.path.join(dir_path, f"config{os.sep}config.json")
        with open(config_path, "r") as config_file:
            # 反序列化配置文件
            config = json.load(config_file)
        config = config.get("kafka", {})
        self.clusters = config.get("clusters", list())
        if not self.clusters:
            raise Exception("配置文件中clusters为空值")
        self.admin = KafkaAdminClient(bootstrap_servers=self.clusters)

    def create_topic(self, topic_name, num_partitions=1, replication_factor=1):
        """
        创建topic
        :param topic_name: topic名
        :param num_partitions: 分区号
        :param replication_factor: 备份数
        :return: 创建结果
        """
        topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
        return self.admin.create_topics([topic], validate_only=False)

    def delete_topic(self, topic_name):
        """
        删除topic
        :param topic_name:
        :return:
        """
        return self.admin.delete_topics([topic_name])
